分布式定时任务框架Quartz总结和实践(2) 您所在的位置:网站首页 任务调度框架quartz 调用微服务 分布式定时任务框架Quartz总结和实践(2)

分布式定时任务框架Quartz总结和实践(2)

2024-03-11 02:26| 来源: 网络整理| 查看: 265

本文主要介绍分布式定时任务框架Quartz集成SpringBoot持久化数据到Mysql数据库的操作,上一篇文章使用Quartz创建定时任务都是保存在内存中,如果服务重启定时任务就会失效,所以Quartz官方也提供将定时任务等信息持久化到Mysql数据库的功能,本文主要实现这种Quartz的这种使用方式。

[TOC]

一、概述

Quartz提供两种基本作业存储类型,包括将任务数据保存在内存中的RAMJobStore模式和保存在数据库中的JDBC作业存储模式,上篇说明了RAMJobStore 模式,本篇说明JDBC作业存储。

RAMJobStore :RAM也就是内存,默认情况下Quartz会将任务调度存在内存中,这种方式性能是最好的,因为内存的速度是最快的。不好的地方就是数据缺乏持久性,但程序崩溃或者重新发布的时候,所有运行信息都会丢失JDBC作业存储:存到数据库之后,可以做单点也可以做集群,当任务多了之后,可以统一进行管理。关闭或者重启服务器,运行的信息都不会丢失。缺点就是运行速度快慢取决于连接数据库的快慢。 二、功能说明

实现功能:

实现将Quartz的任务持久化到Mysql中,当服务重启后也能重写执行之前的任务;实现通过restful API能够增删改查Quartz中执行的任务;

主要依赖:

springbootquartzmybatis:ORM组件Druid:数据库连接池mysql 8.0:数据库 三、实现步骤 1.配置POM文件

配置需要引入的依赖项,涉及Quartz的是:quartz-jobs、spring-boot-starter-quartz

4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.2 com.example QuartzDemo 0.0.1-SNAPSHOT QuartzDemo Demo project for Spring Boot 1.8 com.alibaba fastjson 1.2.4 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-jdbc org.mybatis.spring.boot mybatis-spring-boot-starter 2.2.1 org.projectlombok lombok true mysql mysql-connector-java runtime org.springframework.boot spring-boot-starter-test test org.quartz-scheduler quartz-jobs 2.2.1 org.springframework.boot spring-boot-starter-quartz com.alibaba druid-spring-boot-starter 1.1.10 org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok org.mybatis.generator mybatis-generator-maven-plugin 1.3.2 /Users/yangnk/IdeaProjects/SpringBoot-Learning/QuartzDemo/src/main/resources/generatorConfig.xml true mysql mysql-connector-java 8.0.12 2.配置application.yml文件

选用Druid做数据库连接池,需要对连接池进行配置。

server: port: 9000 #数据库连接池druid配置 spring: datasource: #1.JDBC type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://x.x.x.x:3306/quartz?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true username: root password: xxxxxx druid: #2.连接池配置 #初始化连接池的连接数量 大小,最小,最大 initial-size: 5 min-idle: 5 max-active: 20 #配置获取连接等待超时的时间 max-wait: 60000 #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 time-between-eviction-runs-millis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 min-evictable-idle-time-millis: 30000 validation-query: SELECT 1 FROM DUAL test-while-idle: true test-on-borrow: true test-on-return: false # 是否缓存preparedStatement,也就是PSCache 官方建议MySQL下建议关闭 个人建议如果想用SQL防火墙 建议打开 pool-prepared-statements: true max-pool-prepared-statement-per-connection-size: 20 # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 filter: stat: merge-sql: true slow-sql-millis: 5000 #3.基础监控配置 web-stat-filter: enabled: true url-pattern: /* #设置不统计哪些URL exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*" session-stat-enable: true session-stat-max-count: 100 stat-view-servlet: enabled: true url-pattern: /druid/* reset-enable: true #设置监控页面的登录名和密码 login-username: admin login-password: admin allow: 127.0.0.1 mybatis: mapper-locations: classpath*:mapper/*.xml type-aliases-package: com.example.quartzdemo.model logging: level: com.example.quartzdemo.mapper : debug 3.配置quartz.properties配置

其中org.quartz.dataSource.qzDS.connectionProvider.class配置项是Druid连接池的Quartz扩展类,该扩展类是需要implements ConnectionProvider,这样在Quartz中才能使用Druid连接池。

其中org.quartz.jobStore.isClustered参数是配置是否开启集群模式,如果开启则配置:ture,如果未开启则配置:false,我们用单机实现所以配置:false。

#============================================================================ # Configure Main Scheduler Properties \u8C03\u5EA6\u5668\u5C5E\u6027 #============================================================================ org.quartz.scheduler.instanceName: DefaultQuartzScheduler org.quartz.scheduler.instanceId = AUTO org.quartz.scheduler.rmi.export: false org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: false org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount= 10 org.quartz.threadPool.threadPriority: 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true org.quartz.jobStore.misfireThreshold: 60000 #============================================================================ # Configure JobStore #============================================================================ #\u5B58\u50A8\u65B9\u5F0F\u4F7F\u7528JobStoreTX\uFF0C\u4E5F\u5C31\u662F\u6570\u636E\u5E93 org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate #\u4F7F\u7528\u81EA\u5DF1\u7684\u914D\u7F6E\u6587\u4EF6 org.quartz.jobStore.useProperties:true #\u6570\u636E\u5E93\u4E2Dquartz\u8868\u7684\u8868\u540D\u524D\u7F00 org.quartz.jobStore.tablePrefix:qrtz_ org.quartz.jobStore.dataSource:qzDS #\u662F\u5426\u4F7F\u7528\u96C6\u7FA4\uFF08\u5982\u679C\u9879\u76EE\u53EA\u90E8\u7F72\u5230 \u4E00\u53F0\u670D\u52A1\u5668\uFF0C\u5C31\u4E0D\u7528\u4E86\uFF09 org.quartz.jobStore.isClustered = false #============================================================================ # Configure Datasources #============================================================================ #\u914D\u7F6E\u6570\u636E\u5E93\u6E90 org.quartz.dataSource.qzDS.connectionProvider.class: com.example.quartzdemo.demo.DruidConnectionProvider org.quartz.dataSource.qzDS.driver: com.mysql.cj.jdbc.Driver org.quartz.dataSource.qzDS.URL: jdbc:mysql://x.x.x.x:3306/quartz?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true org.quartz.dataSource.qzDS.user: root org.quartz.dataSource.qzDS.password: xxxxxx org.quartz.dataSource.qzDS.maxConnection: 10

Druid连接池的Quartz扩展类

import com.alibaba.druid.pool.DruidDataSource; import org.quartz.SchedulerException; import org.quartz.utils.ConnectionProvider; import java.sql.Connection; import java.sql.SQLException; /** * Druid连接池的Quartz扩展类 */ public class DruidConnectionProvider implements ConnectionProvider { /* * 常量配置,与quartz.properties文件的key保持一致(去掉前缀),同时提供set方法,Quartz框架自动注入值。 */ //JDBC驱动 public String driver; //JDBC连接串 public String URL; //数据库用户名 public String user; //数据库用户密码 public String password; //数据库最大连接数 public int maxConnection; //数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。 public String validationQuery; private boolean validateOnCheckout; private int idleConnectionValidationSeconds; public String maxCachedStatementsPerConnection; private String discardIdleConnectionsSeconds; public static final int DEFAULT_DB_MAX_CONNECTIONS = 10; public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120; //Druid连接池 private DruidDataSource datasource; /* * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * * 接口实现 * * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ @Override public Connection getConnection() throws SQLException { return datasource.getConnection(); } @Override public void shutdown() throws SQLException { datasource.close(); } @Override public void initialize() throws SQLException{ if (this.URL == null) { throw new SQLException("DBPool could not be created: DB URL cannot be null"); } if (this.driver == null) { throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!"); } if (this.maxConnection < 0) { throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!"); } datasource = new DruidDataSource(); try{ datasource.setDriverClassName(this.driver); } catch (Exception e) { try { throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e); } catch (SchedulerException e1) { } } datasource.setUrl(this.URL); datasource.setUsername(this.user); datasource.setPassword(this.password); datasource.setMaxActive(this.maxConnection); datasource.setMinIdle(1); datasource.setMaxWait(0); datasource.setMaxPoolPreparedStatementPerConnectionSize(this.DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION); if (this.validationQuery != null) { datasource.setValidationQuery(this.validationQuery); if(!this.validateOnCheckout) datasource.setTestOnReturn(true); else datasource.setTestOnBorrow(true); datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds); } } /* * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * * 提供get set方法 * * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ public String getDriver() { return driver; } public void setDriver(String driver) { this.driver = driver; } public String getURL() { return URL; } public void setURL(String URL) { this.URL = URL; } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getMaxConnection() { return maxConnection; } public void setMaxConnection(int maxConnection) { this.maxConnection = maxConnection; } public String getValidationQuery() { return validationQuery; } public void setValidationQuery(String validationQuery) { this.validationQuery = validationQuery; } public boolean isValidateOnCheckout() { return validateOnCheckout; } public void setValidateOnCheckout(boolean validateOnCheckout) { this.validateOnCheckout = validateOnCheckout; } public int getIdleConnectionValidationSeconds() { return idleConnectionValidationSeconds; } public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) { this.idleConnectionValidationSeconds = idleConnectionValidationSeconds; } public DruidDataSource getDatasource() { return datasource; } public void setDatasource(DruidDataSource datasource) { this.datasource = datasource; } } 4.在数据库中创建quartz相关的表

进入quartz的官网Quartz Enterprise Job Scheduler,点击Downloads,也可以直接去文章末尾直接下载官方提供的Quartz相关表。

也可以自定义业务表,我们定义了t_schedule_trigger、t_schedule_trigger_param,具体表功能可以参考总结部分。

5.自定义MyJobFactory

自定义MyJobFactory的作用主要是为了解决在Spring中需要注入Quartz的bean实例,由于是将任务bean的全限定名信息保存在数据库中,Job对象的实例化过程是在Quartz中进行的,需要注入的实体类是在Spring容器当中的,所以在job中无法注入Srping容器的实体类。

Job的创建都是通过JobFactory创建的。JobFactory 有2个实现类:AdaptableJobFactory 和 SimpleJobFactory,可以通过AdaptableJobFactory类来解决:

自定义的工厂类 JobFactory 继承 AdaptableJobFactory 。通过调用父类 AdaptableJobFactory 的方法createJobInstance来实现对Job的实例化。在Job实例化完以后,再调用自身方法为创建好的Job实例进行属性自动装配并将其纳入到Spring容器的管理之中。(通过AutowireCapableBeanFactory纳入)。 import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.scheduling.quartz.AdaptableJobFactory; import org.springframework.stereotype.Component; /** * @description:自定义MyJobFactory,解决spring不能在quartz中注入bean的问题 */ @Component public class MyJobFactory extends AdaptableJobFactory { /** * @description:这个对象Spring会帮我们自动注入进来 */ @Autowired private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); //通过以下方式,解决Job任务无法使用Spring中的Bean问题 autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } 创建调度器schedule

在配置类中创建schedule。

import org.quartz.Scheduler; import org.springframework.beans.factory.config.PropertiesFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import javax.annotation.Resource; import java.io.IOException; import java.util.Properties; /** * @description:quartz配置类,将调度器交给spring管理 */ @Configuration public class QuartzConfigration { @Resource private MyJobFactory myJobFactory; @Bean public Scheduler scheduler(){ return this.getSchedulerFactoryBean().getScheduler(); } @Bean public SchedulerFactoryBean getSchedulerFactoryBean(){ //1.创建SchedulerFactoryBean SchedulerFactoryBean sc = new SchedulerFactoryBean(); //2.加载自定义的quartz.properties sc.setQuartzProperties(this.getProperties()); //3.设置自定义的MyJobFactory sc.setJobFactory(myJobFactory); return sc; } /** * @description:读取自定义的properties文件 * @author: jie * @time: 2022/1/15 18:24 */ public Properties getProperties(){ try { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); //设置自定义配置文件的位置 propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); //读取配置文件 propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } } } 6.创建自定义任务

Quartz中自定义任务需要implements Job。

import lombok.extern.slf4j.Slf4j; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.stereotype.Component; import java.util.Date; @Component @Slf4j public class MyJob1 implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { //在日志中打印以下语句 log.info("job1 ="+new Date().toLocaleString()); JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); for (String s : jobDataMap.keySet()) { System.out.println(s); } } } 7.生成数据库访问相关文件

通过mybatis-generator-maven-plugin插件生成XML文件、Mapper文件、Model文件。

8.增加quartz中的任务Service层方法

主要实现两部分功能,一部分是通过 @Scheduled设置定时任务,定时同步数据库中的任务信息,如果有新增的任务则加入到调度器中,如果有删除的任务则从调度器中删除。

另一部分是提供新增和删除任务的逻辑,通过restful API的形式可以动态的向数据库中新增和删除任务。

public interface IScheduleService { /** * 定时更新任务 */ public void refresh(); /** * 创建新任务 * @param triggerVo */ public void addTrigger(TriggerVo triggerVo); /** * 停止任务 * * @param jobName * @param jobGroup */ public void deleteTrigger(String jobName, String jobGroup); } import com.example.quartzdemo.mapper.TScheduleTriggerMapper; import com.example.quartzdemo.mapper.TScheduleTriggerParamMapper; import com.example.quartzdemo.model.TScheduleTrigger; import com.example.quartzdemo.model.TScheduleTriggerParam; import lombok.extern.slf4j.Slf4j; import org.quartz.*; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; @Service @Slf4j public class ScheduleServiceImpl implements IScheduleService { /** * @description: * @author: 调度器 * @time: 2022/1/15 19:15 */ @Resource private Scheduler scheduler; @Resource private TScheduleTriggerMapper scheduleTriggerMapper; @Resource private TScheduleTriggerParamMapper scheduleTriggerParamMapper; public void deleteTrigger(String jobName, String jobGroup) { try { scheduleTriggerMapper.deleteByJobName(jobName); scheduler.deleteJob(new JobKey(jobName)); } catch (SchedulerException e) { e.printStackTrace(); } } public void addTrigger(TriggerVo triggerVo) { int id = (int) (System.currentTimeMillis() % Integer.MAX_VALUE); TScheduleTrigger trigger = new TScheduleTrigger(); trigger.setCron(triggerVo.getCron()); trigger.setJobGroup(triggerVo.getJobGroup()); trigger.setJobName(triggerVo.getJobName()); trigger.setId(id); trigger.setStatus("1"); scheduleTriggerMapper.insert(trigger); HashMap allMap = triggerVo.getMap(); JobDataMap jobDataMap = new JobDataMap(); for (Map.Entry entry : allMap.entrySet()) { TScheduleTriggerParam tScheduleTriggerParam = new TScheduleTriggerParam(); tScheduleTriggerParam.setScheduleTriggerId(id); tScheduleTriggerParam.setName(entry.getKey()); tScheduleTriggerParam.setValue(entry.getValue()); scheduleTriggerParamMapper.insert(tScheduleTriggerParam); jobDataMap.put(entry.getKey(),entry.getValue()); } try { //设置TriggerKey String jobName = triggerVo.getJobName(); String jobGroup = triggerVo.getJobGroup(); String cron = triggerVo.getCron(); //创建触发器 CronTrigger cronTrigger1 = TriggerBuilder.newTrigger() .withIdentity(jobName, jobGroup) .withSchedule(CronScheduleBuilder.cronSchedule(cron)) .build(); //创建工作详情实例 JobDetail jobDetail = JobBuilder.newJob((Class


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有